跳到主要内容

Spark 快速开始

配置

要在Spark中使用LakeSoul,请首先配置Spark Catalog。LakeSoul使用Apache Spark的DataSourceV2 API来实现数据源和目录。此外,LakeSoul还提供了 Scala 的表API,以扩展LakeSoul数据表的功能。

Spark 3 Support Matrix

LakeSoulSpark Version
2.2.x-2.4.x3.3.x
2.0.x-2.1.x3.1.x

Spark Shell/SQL/PySpark

使用LakeSoulSparkSessionExtension sql扩展来运行spark-shell/spark-sql/pyspark。

spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul  --jars lakesoul-spark-spark-3.3-2.6.0.jar

Maven 项目依赖配置

<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-spark</artifactId>
<version>3.3-2.6.0</version>
</dependency>
// Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.lakesoul.LakeSoulOptions
import spark.implicits._
import com.dmetasoul.lakesoul.tables.LakeSoulTable


val builder = SparkSession.builder()
.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
val spark = builder.getOrCreate()

创建命名空间

首先,为LakeSoul表创建一个namespace,如果不创建将使用默认的namespace,LakeSoul Catalog的默认namespace是default

CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace;
USE lakesoul_namespace;
SHOW TABLES;

创建表

使用USING lakesoul的子句创建一个分区的LakeSoul表,或使用DataFrameWriterV2 API,第一次写入时自动创建相应的LakeSoul表。

CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) 
USING lakesoul
PARTITIONED BY (`date`)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table';

主键表

在LakeSoul中,带有主键的表被定义为哈希分区表。使用USING lakesoul子句,并结合TBLPROPERTIES设置(其中'hashPartitions'指定以逗号分隔的主键列表,'hashBucketNum'指定哈希桶的大小),可以创建一个哈希分区的LakeSoul表。

CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) 
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table'
TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2');

主键 CDC 表

哈希分区的LakeSoul表具有可选的数据变更捕获(CDC)功能,能够记录数据的变化。要创建支持CDC的LakeSoul表,可以在哈希分区表的DDL语句中添加额外的TBLPROPERTIES设置,指定'lakesoul_cdc_change_column'属性。这个属性定义了一个隐式列,帮助表有效地处理CDC信息,从而实现对数据变更的精确追踪和管理。

CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) 
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table'
TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op');

数据插入/合并

要使用Spark SQL向非哈希分区表写入数据,请使用INSERT INTO语句。

要使用DataFrame向表写入数据,请使用DataFrameWriterV2 API。如果这是对该表的第一次写入,它还将自动创建相应的LakeSoul表。

INSERT INTO TABLE lakesoul_table VALUES (1, 'Alice', '2024-01-01'), (2, 'Bob', '2024-01-01'), (1, 'Cathy', '2024-01-02');

要使用Spark SQL向哈希分区表写入数据,请使用Merge INTO语句。

要使用DataFrame向哈希分区表写入数据,请使用LakeSoulTableupsert API。

CREATE OR REPLACE VIEW spark_catalog.default.source_view (id , name, date)
AS SELECT 1L as `id`, 'George' as `name`, '2024-01-01' as `date`;


MERGE INTO lakesoul_hash_table AS t
USING spark_catalog.default.source_view AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

数据更新

LakeSoul表可以通过DataFrame或使用标准的UPDATE语句进行更新。要使用DataFrame更新表中的数据,请使用LakeSoulTableupdateExpr API。

UPDATE lakesoul_table SET name = 'David' WHERE id = 2;

数据删除

LakeSoul表可以通过DataFrame或使用标准的DELETE语句来删除记录。要使用DataFrame从表中删除数据,请使用LakeSoulTabledelete API。

DELETE FROM lakesoul_table WHERE id =1;

数据查询

LakeSoul表可以使用DataFrame或Spark SQL进行查询。

SELECT * FROM lakesoul_table;

Time Travel查询

LakeSoul支持Time Travel查询,可以查询历史上任何时间点的表或两个提交时间之间的更改数据。

// Scala
val tablePath = "file:/tmp/lakesoul_namespace/cdc_table"
Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "hash2", "insert"), ("range4", "hash2", "insert"), ("range4", "hash4", "insert"), ("range3", "hash3", "insert"))
.toDF("range", "hash", "op")
.write
.mode("append")
.format("lakesoul")
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", "2")
.option("shortTableName", "cdc_table")
.option("lakesoul_cdc_change_column", "op")
.save(tablePath)
// record the version of 1st commit
import java.text.SimpleDateFormat

val versionA: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


val lakeTable = LakeSoulTable.forPath(tablePath)
lakeTable.upsert(Seq(("range1", "hash1-1", "delete"), ("range2", "hash2-10", "delete"))
.toDF("range", "hash", "op"))
// record the version of 2nd commit
val versionB: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)

lakeTable.upsert(Seq(("range1", "hash1-13", "insert"), ("range2", "hash2-13", "update"))
.toDF("range", "hash", "op"))
lakeTable.upsert(Seq(("range1", "hash1-15", "insert"), ("range2", "hash2-15", "update"))
.toDF("range", "hash", "op"))
// record the version of 3rd,4th commits
val versionC: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


全量查询

// Scala
spark.sql("SELECT * FROM cdc_table")

快照查询

LakeSoul支持快照查询,可用于查询历史上某一时间点的表数据。

// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range2")
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.SNAPSHOT_READ)
.load(tablePath)

增量查询

LakeSoul支持增量查询,可获得在起始时间和结束时间之间发生更改的数据记录。

// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range1")
.option(LakeSoulOptions.READ_START_TIME, versionA)
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.INCREMENTAL_READ)
.load(tablePath)

更多案例

接下来,您可以在Spark API文档中了解更多关于在Spark中使用LakeSoul表的案例。